Global Plots from MOM and POP Output¶
Imports¶
from prefect import task, Flow, Parameter
from prefect.executors import DaskExecutor
import intake
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_timeseries, parse_cesm_history
from ncar_jobqueue import NCARCluster
from distributed import Client
import xcollection
import hvplot
import hvplot.xarray
import cmocean
import cartopy.crs as ccrs
import numpy as np
import xarray as xr
import holoviews as hv
import panel as pn
import calc
hv.extension('bokeh')
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/core.py:19: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.
from distributed.utils import format_bytes, parse_bytes, tmpfile
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/core.py:19: FutureWarning: parse_bytes is deprecated and will be removed in a future release. Please use dask.utils.parse_bytes instead.
from distributed.utils import format_bytes, parse_bytes, tmpfile
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/htcondor.py:6: FutureWarning: parse_bytes is deprecated and will be removed in a future release. Please use dask.utils.parse_bytes instead.
from distributed.utils import parse_bytes
Build the Data Catalogs¶
MOM Catalog¶
mom_paths = ['/glade/scratch/mlevy/archive/mom_oob.003/ocn/hist']
mom_builder = Builder(mom_paths, depth=0)
mom_stream_info = {
'mom6.h_bgc_monthly_z':{'component':'ocn', 'frequency':'month_1'},
'mom6.h_bgc_monthly':{'component':'ocn', 'frequency':'month_1'},
'mom6.hm':{'component':'ocn', 'frequency':'month_1'},
'mom6.static':{'component':'ocn', 'frequency':'month_1'}
# add hm bgc stream
}
mom_builder.build(parse_cesm_history, parsing_func_kwargs={'user_streams_dict': mom_stream_info})
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.0s finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/parsers/cesm.py:158: UserWarning: Using the default frequency definitions
warnings.warn('Using the default frequency definitions')
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 2.1s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 2 out of 2 | elapsed: 2.2s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 3 out of 3 | elapsed: 2.4s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 4 out of 4 | elapsed: 2.5s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 1741 out of 1741 | elapsed: 5.8min finished
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/builder.py:191: UserWarning: Unable to parse 760 assets/files. A list of these assets can be found in `.invalid_assets` attribute.
self.get_directories().get_filelist()._parse(
Builder(root_path=[PosixPath('/glade/scratch/mlevy/archive/mom_oob.003/ocn/hist')], extension='.nc', depth=0, exclude_patterns=None, njobs=-1)
mom_builder.save(
# File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
"../data/cesm-validation-mom-pop-vertical-grid.csv",
# Column name including filepath
path_column_name='path',
# Column name including variables
variable_column_name='variables',
# Data file format - could be netcdf or zarr (in this case, netcdf)
data_format="netcdf",
# Which attributes to groupby when reading in variables using intake-esm
groupby_attrs=["component", "stream", "case"],
# Aggregations which are fed into xarray when reading in data using intake
aggregations=[
{
"type": "join_existing",
"attribute_name": "date",
"options": {"dim": "time", "coords": "minimal", "compat": "override"},
}
],
)
Saved catalog location: ../data/cesm-validation-mom-pop-vertical-grid.json and ../data/cesm-validation-mom-pop-vertical-grid.csv
/glade/scratch/mgrover/ipykernel_107241/3445297225.py:1: UserWarning: Unable to parse 760 assets/files. A list of these assets can be found in ../data/invalid_assets_cesm-validation-mom-pop-vertical-grid.csv.
mom_builder.save(
mom_catalog = intake.open_esm_datastore('../data/cesm-validation-mom-pop-vertical-grid.json')
POP catalog¶
pop_paths = ['/glade/scratch/mlevy/archive/pop_no_mcog/ocn/hist']
pop_builder = Builder(pop_paths, depth=0)
pop_stream_info = {}
pop_builder.build(parse_cesm_history, parsing_func_kwargs={'user_streams_dict': pop_stream_info})
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.0s finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.3s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 2 out of 2 | elapsed: 0.7s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 3 out of 3 | elapsed: 1.1s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 4 out of 4 | elapsed: 1.4s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 742 out of 742 | elapsed: 5.8min finished
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/builder.py:191: UserWarning: Unable to parse 1 assets/files. A list of these assets can be found in `.invalid_assets` attribute.
self.get_directories().get_filelist()._parse(
Builder(root_path=[PosixPath('/glade/scratch/mlevy/archive/pop_no_mcog/ocn/hist')], extension='.nc', depth=0, exclude_patterns=None, njobs=-1)
pop_builder.save(
# File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
"../data/cesm-validation-pop.csv",
# Column name including filepath
path_column_name='path',
# Column name including variables
variable_column_name='variables',
# Data file format - could be netcdf or zarr (in this case, netcdf)
data_format="netcdf",
# Which attributes to groupby when reading in variables using intake-esm
groupby_attrs=["component", "stream", "case"],
# Aggregations which are fed into xarray when reading in data using intake
aggregations=[
{
"type": "join_existing",
"attribute_name": "date",
"options": {"dim": "time", "coords": "minimal", "compat": "override"},
}
],
)
Saved catalog location: ../data/cesm-validation-pop.json and ../data/cesm-validation-pop.csv
/glade/scratch/mgrover/ipykernel_107241/3824853795.py:1: UserWarning: Unable to parse 1 assets/files. A list of these assets can be found in ../data/invalid_assets_cesm-validation-pop.csv.
pop_builder.save(
Generic Catalog Building Prefect¶
@task
def build_catalog(data_directory_paths, output_catalog_path, file_type, stream_info, depth=0):
# Setup the Builder
catalog_builder = Builder(data_directory_paths, depth=depth)
if file_type == 'timeseries':
parser = parse_cesm_timeseries
elif file_type == 'history':
parser = parse_cesm_history
# Build the Catalog
catalog_builder.build(parser, parsing_func_kwargs={'user_streams_dict': stream_info})
Read in the Data¶
Spin up a Cluster¶
cluster = NCARCluster()
cluster.scale(10)
client = Client(cluster)
client
Client
Client-37153701-376e-11ec-9850-3cecef1b11e4
| Connection method: Cluster object | Cluster type: dask_jobqueue.PBSCluster |
| Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status |
Cluster Info
PBSCluster
7b42e724
| Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status | Workers: 0 |
| Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-9cf3de9d-d35a-46e7-bbeb-04dc1ad8cc45
| Comm: tcp://10.12.206.57:39346 | Workers: 0 |
| Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
def temporal_average(ds):
return ds.mean(dim='time')
def yearly_average(ds, model):
if model == 'pop':
ds = center_time(ds)
return geocat.comp.climatologies.climatology(ds, "year", "time")
def global_average(ds, configuration_dict):
horizontal_dims = configuration_dict['horizontal_dims']
area_field = configuration_dict['area_field']
land_sea_mask = configuration_dict['land_sea_mask']
return calc.global_mean(ds, horizontal_dims, area_field, land_sea_mask, normalize=True)
def global_integral(ds, configuration_dict):
horizontal_dims = configuration_dict['horizontal_dims']
area_field = configuration_dict['area_field']
land_sea_mask = configuration_dict['land_sea_mask']
return calc.global_mean(ds, horizontal_dims, area_field, land_sea_mask, normalize=False)
def add_cyclic_point(ds):
"""
Add a cyclic point to POP model output dataset
Parameters
----------
ds : xarray.Dataset
POP output dataset
Returns
-------
dso : xarray.Dataset
modified POP model output dataset with cyclic point added
"""
ni = ds.TLONG.shape[1]
xL = int(ni / 2 - 1)
xR = int(xL + ni)
tlon = ds.TLONG.data
tlat = ds.TLAT.data
tlon = np.where(np.greater_equal(tlon, min(tlon[:, 0])), tlon - 360.0, tlon)
lon = np.concatenate((tlon, tlon + 360.0), 1)
lon = lon[:, xL:xR]
if ni == 320:
lon[367:-3, 0] = lon[367:-3, 0] + 360.0
lon = lon - 360.0
lon = np.hstack((lon, lon[:, 0:1] + 360.0))
if ni == 320:
lon[367:, -1] = lon[367:, -1] - 360.0
# Trick cartopy into doing the right thing:
# Cartopy gets confused when the cyclic coords are identical
lon[:, 0] = lon[:, 0] - 1e-8
# Periodicity
lat = np.concatenate((tlat, tlat), 1)
lat = lat[:, xL:xR]
lat = np.hstack((lat, lat[:, 0:1]))
TLAT = xr.DataArray(lat, dims=('nlat', 'nlon'))
TLONG = xr.DataArray(lon, dims=('nlat', 'nlon'))
dso = xr.Dataset({'TLAT': TLAT, 'TLONG': TLONG})
# Copy vars
varlist = [v for v in ds.data_vars if v not in ['TLAT', 'TLONG']]
for v in varlist:
v_dims = ds[v].dims
if not ('nlat' in v_dims and 'nlon' in v_dims):
dso[v] = ds[v]
else:
# Determine and sort other dimensions
other_dims = set(v_dims) - {'nlat', 'nlon'}
other_dims = tuple([d for d in v_dims if d in other_dims])
lon_dim = ds[v].dims.index('nlon')
field = ds[v].data
field = np.concatenate((field, field), lon_dim)
field = field[..., :, xL:xR]
field = np.concatenate((field, field[..., :, 0:1]), lon_dim)
dso[v] = xr.DataArray(field, dims=other_dims + ('nlat', 'nlon'), attrs=ds[v].attrs)
# Copy coords
for v, da in ds.coords.items():
if not ('nlat' in da.dims and 'nlon' in da.dims):
dso = dso.assign_coords(**{v: da})
return dso
def grab_static_data(catalog):
return xr.open_dataset(catalog.search(stream='mom6.static').df.path.values[0])
@task
def read_catalog(path, csv_kwargs):
return intake.open_esm_datastore(path, csv_kwargs=csv_kwargs)
@task
def subset_catalog(catalog, search_dict):
return catalog.search(**search_dict)
@task
def subset_dates(catalog, date_subset):
return catalog.search(date=catalog.df.date[date_subset].values)
@task
def convert_to_collection(dsets):
return xcollection.Collection(dsets)
@task
def load_data(catalog, cdf_kwargs):
return catalog.to_dataset_dict(cdf_kwargs=cdf_kwargs)
@task
def long_term_mean(collection):
return collection.map(temporal_average)
@task
def annual_mean(collection, model):
return collection.map(yearly_average, model)
@task
def global_mean(collection, configuration_dict):
return collection.map(global_average, configuration_dict)
@task
def global_integral(collection, configuration_dict):
return collection.map(global_integral, configuration_dict)
@task
def fix_grid(collection):
return collection.map(temporal_average)
test = {'mom':{'integral_variables':[{'FG_CO2':{'out_units':{'Tg/year'}}}],
'mean_variables':['TEMP':{'out_units':'degC'}],
'horizontal_dims':('yh', 'xh'),
'area_field':'area_t',
'land_sea_mask':'wet'}}
File "/glade/scratch/mgrover/ipykernel_198907/1801437483.py", line 2
'mean_variables':['TEMP':{'out_units':'degC'}],
^
SyntaxError: invalid syntax
import ast
with Flow('ocean-diagnostics') as flow:
# Setup our parameters
path = Parameter('path', default='../data/cesm-validation-pop.json')
csv_kwargs = Parameter('csv_kwargs', default={"converters": {"variables": ast.literal_eval}})
search_dict = Parameter('search_dict', default={})
subset_times = Parameter('subset_times', default=slice(-48, -1))
cdf_kwargs = Parameter('cdf_kwargs', default={'chunks':{}})
model = Parameter('model', default='pop')
configuration_dict = Parameter('configuration_dict', default={})
# Setup our functions/tasks
cat = read_catalog(path, csv_kwargs)
# Subset the catalog
cat_subset = subset_catalog(cat, search_dict)
# Add the date subset
cat_subset = subset_dates(cat_subset, subset_times)
# Call to dataset dict
dsets = load_data(cat_subset, cdf_kwargs=cdf_kwargs)
# Convert to an xcollection
collection = convert_to_collection(dsets)
# Calculate long-term mean
long_term_average = long_term_mean(collection)
# Calculate the annual mean
#annual_average = annual_mean(collection, model)
# Calculate the global mean and global integral
#mean_globe = global_mean(annual_average, configuration_dict)
#integral_globe = global_integral(annual_average, configuration_dict)
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/contextlib.py:124: UserWarning: Tasks were created but not added to the flow: {<Parameter: model>, <Parameter: configuration_dict>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
flow.visualize()
Execute the Workflow¶
executor = DaskExecutor(address=client.scheduler.address)
variables = ['O2', 'PO4', 'SiO3', 'NO3']
pop_result = flow.run(path = '../data/cesm-validation-pop.json',
search_dict={'stream':'pop.h',
'frequency':'month_1',
'variables':variables},
executor=executor)
[2021-10-27 16:11:36-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'ocean-diagnostics'
[2021-10-27 16:11:36-0600] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://10.12.206.57:39346
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/distributed/scheduler.py:5490: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
warnings.warn(
[2021-10-27 16:12:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
list(pop_result.result.keys())
[<Task: read_catalog>,
<Task: subset_catalog>,
<Task: subset_dates>,
<Parameter: subset_times>,
<Parameter: cdf_kwargs>,
<Task: convert_to_collection>,
<Parameter: path>,
<Parameter: csv_kwargs>,
<Parameter: search_dict>,
<Task: load_data>,
<Task: long_term_mean>]
pop_result.result[long_term_average]
<Success: "Task run succeeded.">
pop_result.result[-1]
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/glade/scratch/mgrover/ipykernel_301136/166169208.py in <module>
----> 1 pop_result.result[-1]
KeyError: -1
flow.get_tasks()
[<Parameter: path>,
<Parameter: search_dict>,
<Task: subset_dates>,
<Task: convert_to_collection>,
<Task: long_term_mean>,
<Task: annual_mean>,
<Parameter: configuration_dict>,
<Task: read_catalog>,
<Parameter: csv_kwargs>,
<Task: subset_catalog>,
<Parameter: subset_times>,
<Task: load_data>,
<Parameter: cdf_kwargs>,
<Parameter: model>,
<Task: global_mean>,
<Task: global_integral>]
output_task = flow.get_tasks()[-5]
output_task
<Task: long_term_mean>
mom_result = flow.run(path = '../data/cesm-validation-mom-pop-vertical-grid.json',
search_dict={'stream':'mom6.h_bgc_monthly_z',
'variables':variables},
executor=executor)
[2021-10-27 11:28:52-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'ocean-diagnostics'
INFO:prefect.FlowRunner:Beginning Flow run for 'ocean-diagnostics'
[2021-10-27 11:28:52-0600] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://10.12.206.39:33277
INFO:prefect.DaskExecutor:Connecting to an existing Dask cluster at tcp://10.12.206.39:33277
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/distributed/scheduler.py:5490: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
warnings.warn(
[2021-10-27 11:28:59-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
mom_catalog = intake.open_esm_datastore('../data/cesm-validation-mom-pop-vertical-grid.json')
mom_output = mom_result.result[output_task]._result.value
mom_ds = mom_output['ocn.mom6.h_bgc_monthly_z.mom_oob.003']
static_ds = grab_static_data(mom_catalog)
mom_with_coords = xr.merge([mom_ds, static_ds]).set_coords(['geolon', 'geolat'])[variables].compute().rename({'geolon':'lon',
'geolat':'lat'})
collection = pop_result.result[output_task]._result.value
ds = collection['ocn.pop.h.pop_no_mcog'][variables].drop_vars(['ULAT', 'ULONG']).compute()
dso = add_cyclic_point(ds).set_coords(['TLAT', 'TLONG']).rename({'TLAT':'lat',
'TLONG':'lon'})
mom_with_coords['lon'].attrs = dso.lon.attrs
mom_with_coords['lat'].attrs = dso.lat.attrs
dso = dso.rename({'z_t':'z_pop_l'})
dso['z_pop_l'] = mom_with_coords.z_pop_l
mom_plot = mom_with_coords.hvplot.quadmesh(x='lon',
y='lat',
z=variables,
cmap='inferno',
rasterize=True,
coastline=True,
projection=ccrs.PlateCarree(),
title='MOM Case: mom_oob.003',
project=True
)
pop_plot = dso.hvplot.quadmesh(x='lon',
y='lat',
z=variables,
geo=True,
rasterize=True,
coastline=True,
cmap='inferno',
projection=ccrs.PlateCarree(),
title='POP Case: pop_no_mcog',
project=True
)
hv.Layout([mom_plot, pop_plot]).opts(shared_axes=True).cols(1)